ROS2通讯总览
机器人运行流程 初始化流程
上塔流程
攀爬流程
下塔流程 待定,与上塔类似
Node解释 相机采集Node(RoboCamNode) 获取3个相机的视频流 作为RobotCamStream 的publisher,发布相 机RGB-D信息(根据订阅的ExecutorStatus中的freeClaw数据来切换相机源)
关节控制Node(RoboJointNode) 通过CAN通讯控制机械臂关节 作为JointControl的Action server,响应其他Node的控制指令,响应其他Node的关节状态数据请求
电爪控制Node(RoboClawNode) 通过485通讯控制机器人电爪 作为ClawControl的Action server,响应主程序Node的控制指令,响应其他Node的电爪状态数据请求
主程序Node(XclimbotNode) 负责机器人的主要运行流程
用户端通讯Node(UINetworkNode) 负责与用户端控制程序进行网络通讯 从主程序Node获取机器人信息,包括机器人运行状态(RobotStatus)和机器人执行器位置(RobotPosition) 获取相机的视频流信息
ROS Topic
RobotCamStream.msg 持续publish视频流信息 msg的具体定义待定
ExecutorStatus.msg 返回当前机器人的执行器状态:
1 2 3 4 5 int8[2 ] claw_status = 0 ~4 int8 fasten_status = 0 ~4 int8 freeClaw = 1 | 2 int8[6 ] joint_status, int64[6 ] joint_angle
RunnningStatus.msg 返回当前机器人的状态
ROS Action
### JointControl.action
Request 请求关节执行一次关节空间运动。
**Note:需要多个关节同步执行指令,同时做动。**
包含六个关节的目标角度:
1 2 int64[6 ] goal_angle int8 vel_limit = 0 ~30
Feedback 为以5Hz频率返回当前关节角度和关节运动状态,表示正在正常执行。
1 2 int64[6 ] current_angle int8[6 ] current_running
Result 用于描述关节运动任务的最终执行结果,仅在任务结束后可获取。
1 2 bool success = true | false string message
ClawControl.action 用于控制机器人电爪运动。
1 2 3 4 5 6 7 8 9 10 11 12 13 int8 index = 1 | 2 int8 grab = 0 | 1 int32 force = 700 int8 index = 1 | 2 int8 running_status = 0 | 1 int8 index = 1 | 2 bool success = true | false string message
PrepareClimb.action 机器人在攀爬结束后两足均抓取于角钢上,此时需要用户发出继续攀爬指令,机器人抬起自由足会运动到拍摄深度图像的位置。
仅当机器人处于status 2(即“等待用户攀爬指令”)时可以发出指令
1 2 3 4 5 6 7 8 9 10 bool prepare = true int64[6 ] current_angle int8[6 ] current_running bool success = true | false string message
VisionSelect.action 用户在用户端点选角钢的坐标数据通过网络发送到用户端通讯Node之后,该Node将坐标信息作为指令目标传给主程序Node,由主程序Node进行图像分析得到抓取点,将mask和抓取点反馈给通讯端Node用于显示。 在用户端程序的视频图像上覆盖显示(半透明)对应的Mask,以及预计抓取点的位置。
这个视觉选择的请求只有当机器人处于status 6 (即“等待用户选取的坐标”)时才有效
1 2 3 4 5 6 7 8 9 10 11 12 int16 point_x = 0 -1280 int16 point_y = 0 -720 bool success = true | false bool [720 ][1280 ] mask int16 result_x = 0 -1280 int16 result_y = 0 -720 string message
DirectJoint.action
直接控制应该为用户端的一个独立窗口(通过菜单栏选项调出)
用户端通讯Node发给主程序Node的指令,可让用户端直接控制关节的运动。
1 2 3 4 5 6 7 8 9 10 11 int64[6 ] goal_angle int8 vel_limit = 0 ~30 int64[6 ] current_angle int8[6 ] current_running bool success = true | false string message
DirectClaw.action
直接控制应该为用户端的一个独立窗口(通过菜单栏选项调出)
用户端通讯Node发给主程序Node的指令,可让用户端直接控制电爪的运动。
1 2 3 4 5 6 7 8 9 10 11 12 13 int8 index = false | true int8 task = 0 | 1 int32 force = 700 int8 index = false | true int8 task = 0 | 1 int8 index = 1 | 2 bool success = true | false string message
ROS Service
### EmergencyStop.srv
用户在用户端点击急停按钮通过网络发送到用户端通讯Node之后,用户端通讯Node将确认信息通过service告知主程序Node
> 急停的具体行为为:cancel joint action, 让固定足保持夹持状态,自由足保持张开
1 2 3 4 5 6 bool stop = true bool success = true | false string message
VisionConfirm.srv 用户端确认抓取点和mask有效与否,将确认信息发回给机器人的用户端通讯Node,用户端通讯Node将确认信息通过service告知主程序Node
这个确认的请求只有当机器人处于status 8 (即“确认抓取结果”)时才有效
1 2 3 4 5 bool confirm = true | false bool received = true
SwitchClaw.srv 用户端点击切换电爪的按钮后,将切换指令发给机器人的用户通讯端Node,用户端通讯Node将确认信息通过service告知主程序Node
仅当机器人处于status 2时可以切换。
1 2 3 4 5 6 bool switch = true bool success = true | false string message
备注 1 使用一个package robot_interfaces 用于存放所有的接口定义
只放接口定义
不含任何 node
被 server / client / pub / sub 共同依赖
例如:
1 2 3 4 5 6 7 8 9 10 11 my_robot_interfaces/ ├── msg/ │ └── RobotCamStream.msg ├── srv/ │ └── RunningStatus.srv │ └── ExecutorStatus.srv │ └── ... └── action/ └── ClawControl.action └── JointControl.action └── ...
2 主程序的核心功能由毕老师实验室团队负责实现。杨总团队需基于主程序框架,在主程序中自行完成本文档所定义的各类 ROS 接口(包括 Action、Service、Topic)的 Server 端实现。 同时,杨总团队需提供与上述接口一一对应的功能测试代码或示例调用代码,以便直接集成或粘贴至主程序中,用于接口联调与功能验证,确保接口定义与实际行为的一致性和可测试性。 典型的主程序python示例为(并非具体实现):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 import rclpyfrom rclpy.node import Nodefrom rclpy.action import ActionServer, GoalResponse, CancelResponsefrom my_robot_interfaces.action import JointControlimport threadingimport timeclass XclimbotMainNode (Node ): def __init__ (self ): super ().__init__('xclimbot_main_node' ) self .current_joint_angle = [0 ] * 6 self .current_joint_running = [0 ] * 6 self ._action_server = ActionServer( self , JointControl, 'joint_control' , execute_callback=self .execute_callback, goal_callback=self .goal_callback, cancel_callback=self .cancel_callback ) self .get_logger().info('JointControl Action Server started' ) def goal_callback (self, goal_request ): self .get_logger().info( f'Received goal: {list (goal_request.goal_angle)} , ' f'vel_limit={goal_request.vel_limit} ' ) if len (goal_request.goal_angle) != 6 : self .get_logger().warn('Invalid goal angle length' ) return GoalResponse.REJECT return GoalResponse.ACCEPT def cancel_callback (self, goal_handle ): self .get_logger().warn('Received cancel request' ) return CancelResponse.ACCEPT def execute_callback (self, goal_handle ): self .get_logger().info('Executing JointControl goal' ) feedback = JointControl.Feedback() result = JointControl.Result() self .current_joint_running = [1 ] * 6 rate = self .create_rate(5 ) for step in range (20 ): if goal_handle.is_cancel_requested: self .get_logger().warn('Goal canceled during execution' ) self .current_joint_running = [0 ] * 6 result.success = False result.message = 'Goal canceled by client' goal_handle.canceled() return result for i in range (6 ): self .current_joint_angle[i] += 1000 feedback.current_angle = self .current_joint_angle feedback.current_running = self .current_joint_running goal_handle.publish_feedback(feedback) self .get_logger().debug( f'Feedback sent: {feedback.current_angle} ' ) rate.sleep() self .current_joint_running = [0 ] * 6 result.success = True result.message = 'Joint motion finished successfully' goal_handle.succeed() self .get_logger().info('JointControl goal succeeded' ) return result def main (args=None ): rclpy.init(args=args) node = XclimbotMainNode() rclpy.spin(node) node.destroy_node() rclpy.shutdown() if __name__ == '__main__' : main()
测试仅针对接口本身,不涉及对于具体机器人运行逻辑的测试。